package priv.lhy.ecm.collectorProducer.task.job;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import pirv.lhy.ecm.collectorAPI.constants.UploadType;
import priv.lhy.common.utils.JsonUtil;
import priv.lhy.common.utils.MD5Util;
import priv.lhy.common.utils.StringUtil;
import priv.lhy.ecm.collectorProducer.aftertreatment.record.IDispose;
import priv.lhy.ecm.collectorProducer.constants.ExceptionType;
import priv.lhy.ecm.collectorProducer.constants.ReuploadType;
import priv.lhy.ecm.collectorProducer.dataTransfer.AbsTransfer;

import java.io.IOException;

/**
 * @author: lihy
 * date: 2019/6/29 18:05
 * description: 任务抽象类，新建任务只需要继承此并传递不同参数
 */
public abstract class AbsJob implements Job {

    private String tcpHost;

    private int tcpPort;

    private String delimiter;

    private String company;

    private String userCode;

    private UploadType type;

    private Bootstrap bootstrap;

    private AbsTransfer transfer;

    @Autowired
    private ApplicationContext applicationContext;

    @Value("${upload.keepRecord.type}")
    private int keepRecordType;


    @Bean
    private IDispose getDispose(){
        return (IDispose) applicationContext.getBean("disposeBy"
                + StringUtil.firstCharUpperCase(ReuploadType.values()[keepRecordType].name().toLowerCase()));
    }

    public AbsJob(String tcpHost, int tcpPort, String delimiter, String company, String userCode,
                  UploadType type, Bootstrap bootstrap, AbsTransfer transfer) {
        this.tcpHost = tcpHost;
        this.tcpPort = tcpPort;
        this.delimiter = delimiter;
        this.company = company;
        this.userCode = userCode;
        this.type = type;
        this.bootstrap = bootstrap;
        this.transfer = transfer;
    }

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StringBuffer transferSB = new StringBuffer();
        try {
            String jsonStr = JsonUtil.Object2Json(transfer.buildData());

            /**
             * 上传数据格式：
             * token|userCode|uploadType|tableSuffix|jsonData|delimiter
             */
            transferSB.append(MD5Util.getMD5Str(company, userCode)).append("|").append(userCode)
                    .append("|").append(type).append("|").append("").append("|").append(jsonStr)
                    .append("|").append(delimiter);

            ChannelFuture future = bootstrap.connect(tcpHost, tcpPort).sync();
            Channel channel = future.channel();

            //将上传信息作为附件传递，服务端返回错误可进行后续处理
            AttributeKey<String> attKey = AttributeKey.valueOf("uploadMsg");
            Attribute<String> attr = channel.attr(attKey);
            attr.set(transferSB.toString());

            //上传
            channel.writeAndFlush(transferSB);

            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            getDispose().dispose(transferSB.toString(), null, ExceptionType.CONNECT_ERROR);
        }

    }
}
